package com.kool.kmqtt.server.task;

import com.kool.kmqtt.server.session.SessionContext;
import com.kool.kmqtt.server.session.SessionHolder;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.DelayQueue;

@Slf4j
public class ConnectCheckTask implements Runnable {
    /**
     * 用于处理连接超时的延时队列
     */
    private final DelayQueue<ConnectDelayed> connectTimeoutQueue = new DelayQueue<>();

    /**
     * 连接检查
     *
     * @param connectDelayed
     */
    public void check(ConnectDelayed connectDelayed) {
        connectTimeoutQueue.offer(connectDelayed);
    }

    @Override
    public void run() {
        try {
            while (true) {
                try {
                    ConnectDelayed delayed = connectTimeoutQueue.take();
                    //如果没有收到CONNECT报文，服务端应该关闭这个连接
                    String sessionId = delayed.getSessionId();
                    SessionContext sessionContext = SessionHolder.getInstance().getBySessionId(sessionId);
                    if (sessionContext != null && sessionContext.getClientId() == null) {
                        //客户端id为空表示没有收到CONNECT报文
                        SessionHolder.close(sessionContext);
                    }
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                }
            }
        } catch (Throwable t) {
            log.error(t.getMessage(), t);
        }

    }
}
